home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyc (Python 2.6)
-
- '''
- Code used to identify a package based on one of the following:
- * A process ID
- * A file name
- * A desktop file
- * A binary package name
- '''
- import sys
- import os
- import socket
- import subprocess
- DPKGDIR = '/var/lib/dpkg'
- INFODIR = os.path.join(DPKGDIR, 'info')
- STATUSFILE = os.path.join(DPKGDIR, 'status')
-
- class PackageNotFoundError(Exception):
- pass
-
-
- def _get_pkg(filename):
- """Find the binary package associated with the given filename
-
- This scans the list files in /var/lib/dpkg/info, since it is faster
- than 'dpkg-query -S'.
- """
- for listfile in os.listdir(INFODIR):
- if not listfile.endswith('.list'):
- continue
-
- contents = open(os.path.join(INFODIR, listfile), 'r').read()
- if filename in contents.splitlines(False):
- return listfile[:-len('.list')]
-
-
-
- class PackageInfo(object):
-
- def __init__(self, binarypackage, sourcepackage, provides, version, architecture, status, dependencies):
- self.binarypackage = binarypackage
- if not sourcepackage:
- pass
- self.sourcepackage = binarypackage
- self.provides = set(provides)
- self.version = version
- self.architecture = architecture
- self.status = status
- self.dependencies = set(dependencies)
- self.names = set(self.provides)
- self.names.add(self.binarypackage)
-
-
- def installed(self):
- if self.status:
- state = self.status.split()[2]
- return state not in ('config-files', 'not-installed')
- return False
-
- installed = property(installed)
-
- def shortstatus(self):
- if self.status:
- sinfo = self.status.split()
- return sinfo[0][0] + sinfo[2][0]
- return None
-
- shortstatus = property(shortstatus)
-
- def __repr__(self):
- return "<PackageInfo '%s_%s_%s'>" % (self.binarypackage, self.version, self.architecture)
-
-
- def fromXID(cls, xid = None, logger = None):
- """Return a PackageInfo instance corresponding to a window
-
- This is performed by calling xprop to get WM_CLIENT_MACHINE
- and _NET_WM_PID properties of a window. If WM_CLIENT_MACHINE
- is our hostname, and _NET_WM_PID is set, then chain to
- fromProcessID().
-
- If no XID is passed, xprop works in 'picker' mode.
- """
- cmdline = [
- 'xprop',
- '-notype']
- if xid is not None:
- cmdline.extend([
- '-id',
- str(xid)])
-
- cmdline.extend([
- 'WM_CLIENT_MACHINE',
- '_NET_WM_PID'])
- if logger:
- logger.debug('Executing %r', cmdline)
-
-
- try:
- p = subprocess.Popen(cmdline, stdout = subprocess.PIPE, stderr = subprocess.PIPE, close_fds = True)
- (stdout, stderr) = p.communicate()
- result = p.returncode
- except (OSError, IOError):
- if logger:
- logger.exception('Could not find process ID')
-
- raise PackageNotFoundError('Could not find process ID')
-
- if result != 0:
- raise PackageNotFoundError('Could not find process ID')
- result != 0
- props = dict((lambda .0: for x in .0:
- x.split(' = ', 1))(stdout.splitlines(False)))
- if logger:
- logger.debug('WM_CLIENT_MACHINE = %s, _NET_WM_PID = %s', props.get('WM_CLIENT_MACHINE'), props.get('_NET_WM_PID'))
-
- hostname = '"%s"' % socket.gethostname()
- if props.get('WM_CLIENT_MACHINE', hostname) != hostname or '_NET_WM_PID' not in props:
- raise PackageNotFoundError('Could not find process ID')
- '_NET_WM_PID' not in props
-
- try:
- pid = int(props['_NET_WM_PID'])
- except ValueError:
- raise PackageNotFoundError('Could not find process ID')
-
- if logger:
- logger.info('Process ID for selected window is %d', pid)
-
- return cls.fromProcessID(pid, logger)
-
- fromXID = classmethod(fromXID)
-
- def fromProcessID(cls, pid, logger = None):
- '''Return a PackageInfo instance corresponding to a process ID
-
- This is performed by looking up the executable name in the /proc
- filesystem, then chaining to fromFilename().
- '''
- if logger:
- logger.debug('Looking up executable for process %d', pid)
-
-
- try:
- filename = os.readlink('/proc/%d/exe' % pid)
- except OSError:
- if logger:
- logger.exception('Could not find executable for process %d', pid)
-
- raise PackageNotFoundError('Could not find executable for process %d' % pid)
-
- if filename.endswith(' (deleted)'):
- logger.error('Process %d is running deleted executable "%s"', pid, filename)
- raise PackageNotFoundError('Process %d is running deleted executable "%s"' % (pid, filename))
- filename.endswith(' (deleted)')
- if logger:
- logger.info('Executable for process %d is "%s"', pid, filename)
-
- if filename.startswith('/rofs'):
- filename = filename[len('/rofs'):]
- elif filename.startswith('/filesystem.squashfs'):
- filename = filename[len('/filesystem.squashfs'):]
-
- return cls.fromFilename(filename, logger)
-
- fromProcessID = classmethod(fromProcessID)
-
- def fromDesktopFile(cls, filename, logger = None):
- """Return a PackageInfo instance corresponding to a Desktop file
-
- This is performed by looking for the executable name in the 'exec'
- line of the desktop file, then chains to fromFilename().
-
- This function should be used instead of fromFilename() because
- the desktop file might be a customised one in the user's home
- directory, but still points to an installed application.
- """
- raise NotImplementedError
-
- fromDesktopFile = classmethod(fromDesktopFile)
-
- def fromFilename(cls, filename, logger = None):
- '''Return a PackageInfo instance corresponding to a file
-
- This is performed by finding the package that owns the file
- using dpkg-query, and then chaining to fromPackageName to
- fill in the PackageInfo instance.
- '''
- if logger:
- logger.debug('Looking up binary package name for file "%s"', filename)
-
-
- try:
- package = _get_pkg(filename)
- except (OSError, IOError):
- if logger:
- logger.exception('Could not find binary package for file "%s"', filename)
-
- raise PackageNotFoundError('Could not find binary package for file "%s"' % filename)
-
- if package is None:
- raise PackageNotFoundError('Could not look up binary package for file "%s"' % filename)
- package is None
- if logger:
- logger.info('Binary package for file "%s" is "%s"', filename, package)
-
- return cls.fromPackageName(package, logger)
-
- fromFilename = classmethod(fromFilename)
-
- def fromPackageName(cls, package, logger = None):
- '''Return a PackageInfo instance for a particular binary package name
- '''
- if logger:
- logger.debug('Looking up package information for "%s"', package)
-
- for info in cls._iterPackages():
- if package in info.names:
- break
- continue
- else:
- raise PackageNotFoundError('Could not look up package info for "%s"' % package)
- if None:
- logger.debug('Package info for %s is %r', package, info)
-
- return info
-
- fromPackageName = classmethod(fromPackageName)
-
- def fromPackageNames(cls, packages, logger = None):
- '''Iterate through PackageInfo instances that match names in packages.
-
- This is an optimisation of fromPackageName() for when you want
- multiple packages.
- '''
- packages = set(packages)
- for info in cls._iterPackages():
- if packages & info.names:
- if logger:
- logger.debug('Package %r matched', info)
-
- yield info
- continue
-
-
- fromPackageNames = classmethod(fromPackageNames)
-
- def _iterPackages(cls, filename = STATUSFILE):
- '''Iterate through the list of packages in the DPKG database'''
- package = None
- status = None
- architecture = None
- source = None
- version = None
- provides = []
- dependencies = []
- for line in open(filename, 'r'):
- if line == '\n':
- yield cls(binarypackage = package, sourcepackage = source, provides = provides, version = version, architecture = architecture, status = status, dependencies = dependencies)
- package = None
- status = None
- architecture = None
- source = None
- version = None
- provides = []
- dependencies = []
- continue
- if line.startswith('Package: '):
- package = line[len('Package: '):].strip()
- continue
- if line.startswith('Status: '):
- status = line[len('Status: '):].strip()
- continue
- if line.startswith('Architecture: '):
- architecture = line[len('Architecture: '):].strip()
- continue
- if line.startswith('Source: '):
- source = line[len('Source: '):].strip()
- continue
- if line.startswith('Version: '):
- version = line[len('Version: '):].strip()
- continue
- if line.startswith('Provides: '):
- provides.extend((lambda .0: for x in .0:
- x.strip())(line[len('Provides: '):].split(', ')))
- continue
- if line.startswith('Depends: '):
- deps = line[len('Depends: '):].strip()
- dependencies.extend((lambda .0: for x in .0:
- for y in x.split('|'):
- y.split()[0])(deps.split(', ')))
- continue
- if line.startswith('Pre-Depends'):
- deps = line[len('Pre-Depends: '):].strip()
- dependencies.extend((lambda .0: for x in .0:
- for y in x.split('|'):
- y.split()[0])(deps.split(', ')))
- continue
-
- if package is not None:
- yield cls(binarypackage = package, sourcepackage = source, provides = provides, version = version, architecture = architecture, status = status, dependencies = dependencies)
-
-
- _iterPackages = classmethod(_iterPackages)
-
-